package com.atguigu.flink.sql.function;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;

/**
 * Created by Smexy on 2023/11/20


 */
public class Demo2_ScalarFunctionCustom
{
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String createTableSql = "CREATE TABLE t1 (" +
            "  id STRING," +
            "  ts BIGINT," +
            "  vc INT " +
            ")  WITH (" +
            "  'connector' = 'filesystem',   " +
            "  'path' = 'data/ws2.json', " +
            "  'format' = 'json'             " +
            ")";

        tableEnv.executeSql(createTableSql);

        //创建函数对象
        MyUpper myFunction = new MyUpper();
        //为函数对象起个名字
        tableEnv.createTemporaryFunction("myUpper",myFunction);
        //SQL
        tableEnv.sqlQuery("select id,ts,vc,myUpper(id) upperId from t1")
                .execute()
                .print();

    }

    //如果字符串不是NULL，则转换为大写，否则返回字符串NULL
    public static class MyUpper extends ScalarFunction{

        //重写一个或多个名为eval()的方法，方法的参数列表可以自定义，但是必须有返回值。
        public String eval(String str){
            if (str == null){
                return "NULL";
            }else {
                return str.toUpperCase();
            }
        }
    }
}
